package disruptor;


import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.util.Util;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * WorkerPool contains a pool of {@link WorkProcessor}s that will consume sequences so jobs can be farmed out across a pool of workers.
 * Each of the {@link WorkProcessor}s manage and calls a {@link WorkHandler} to process the events.
 *
 * @param <T> event to be processed by a pool of workers
 */
public final class WorkerPool<T>
{
  private final AtomicBoolean started = new AtomicBoolean(false);
  private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  private final RingBuffer<T> ringBuffer;
  // WorkProcessors are created to wrap each of the provided WorkHandlers
  private final WorkProcessor<?>[] workProcessors;

  /**
   * Create a worker pool to enable an array of {@link WorkHandler}s to consume published sequences.
   * <p>
   * This option requires a pre-configured {@link RingBuffer} which must have {@link RingBuffer#addGatingSequences(Sequence...)}
   * called before the work pool is started.
   *
   * @param ringBuffer       of events to be consumed.
   * @param sequenceBarrier  on which the workers will depend.
   * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
   * @param workHandlers     to distribute the work load across.
   */
  @SafeVarargs
  public WorkerPool(
      final RingBuffer<T> ringBuffer,
      final SequenceBarrier sequenceBarrier,
      final ExceptionHandler<? super T> exceptionHandler,
      final WorkHandler<? super T>... workHandlers)
  {
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
      workProcessors[i] = new WorkProcessor<>(
          ringBuffer,
          sequenceBarrier,
          workHandlers[i],
          exceptionHandler,
          workSequence);
    }
  }

  /**
   * Construct a work pool with an internal {@link RingBuffer} for convenience.
   * <p>
   * This option does not require {@link RingBuffer#addGatingSequences(Sequence...)} to be called before the work pool is started.
   *
   * @param eventFactory     for filling the {@link RingBuffer}
   * @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
   * @param workHandlers     to distribute the work load across.
   */
  @SafeVarargs
  public WorkerPool(
      final EventFactory<T> eventFactory,
      final ExceptionHandler<? super T> exceptionHandler,
      final WorkHandler<? super T>... workHandlers)
  {
    ringBuffer = RingBuffer.createMultiProducer(eventFactory, 1024, new BlockingWaitStrategy());
    final SequenceBarrier barrier = ringBuffer.newBarrier();
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
      workProcessors[i] = new WorkProcessor<>(
          ringBuffer,
          barrier,
          workHandlers[i],
          exceptionHandler,
          workSequence);
    }

    ringBuffer.addGatingSequences(getWorkerSequences());
  }

  /**
   * Get an array of {@link Sequence}s representing the progress of the workers.
   *
   * @return an array of {@link Sequence}s representing the progress of the workers.
   */
  public Sequence[] getWorkerSequences()
  {
    final Sequence[] sequences = new Sequence[workProcessors.length + 1];
    for (int i = 0, size = workProcessors.length; i < size; i++)
    {
      sequences[i] = workProcessors[i].getSequence();
    }
    sequences[sequences.length - 1] = workSequence;

    return sequences;
  }

  /**
   * Start the worker pool processing events in sequence.
   *
   * @param executor providing threads for running the workers.
   * @return the {@link RingBuffer} used for the work queue.
   * @throws IllegalStateException if the pool has already been started and not halted yet
   */
  public RingBuffer<T> start(final Executor executor)
  {
    if (!started.compareAndSet(false, true))
    {
      throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
    }

    final long cursor = ringBuffer.getCursor();
    workSequence.set(cursor);

    for (WorkProcessor<?> processor : workProcessors)
    {
      processor.getSequence().set(cursor);
      executor.execute(processor);
    }

    return ringBuffer;
  }

  /**
   * Wait for the {@link RingBuffer} to drain of published events then halt the workers.
   */
  public void drainAndHalt()
  {
    Sequence[] workerSequences = getWorkerSequences();
    while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences))
    {
      Thread.yield();
    }

    for (WorkProcessor<?> processor : workProcessors)
    {
      processor.halt();
    }

    started.set(false);
  }

  /**
   * Halt all workers immediately at the end of their current cycle.
   */
  public void halt()
  {
    for (WorkProcessor<?> processor : workProcessors)
    {
      processor.halt();
    }

    started.set(false);
  }

  public boolean isRunning()
  {
    return started.get();
  }
}

